#!/usr/bin/env bash
# torque/compute-execute -- Executes a process remotely using the Torque scheduler
# $ compute-execute input_sql=... command=... output_relation=...
#
# To limit the number of parallel processes, set the DEEPDIVE_NUM_PROCESSES
# environment or the 'deepdive.computers.local.num_processes' in
# computers.conf:
# $ export DEEPDIVE_NUM_PROCESSES=2
# $ compute-execute input_sql=... command=... output_relation=...
##
set -euo pipefail

. commons

: ${DEEPDIVE_PREFIX_TABLE_TEMPORARY:=dd_tmp_} ${DEEPDIVE_PREFIX_TABLE_OLD:=dd_old_}

# load compute configuration
eval "$(jq2sh <<<"$DEEPDIVE_COMPUTER_CONFIG" \
    num_processes='.num_processes' \
    ssh_user='.ssh_user' \
    ssh_host='.ssh_host' \
    remote_deepdive_app_base='.remote_deepdive_app_base' \
    remote_deepdive_transient_base='.remote_deepdive_transient_base' \
    poll_period_secs='.poll_period_secs' \
    excludes='.excludes | join("\t")' \
    additional_paths='.additional_paths | join("\t")' \
    #
)"
# respect the DEEPDIVE_NUM_PROCESSES environment
num_processes=${DEEPDIVE_NUM_PROCESSES:-${num_processes:-$(nproc --ignore=1)}}

# declare all input arguments
declare -- "$@"

export SSH_INFO="${ssh_user}@${ssh_host}"
export LOCAL_USER="$(whoami)"

export REMOTE_DEEPDIVE_HOME=$(ssh $SSH_INFO "deepdive whereis installed .")

# App directory variables
# APP_ID is used to isolate app folders that are uploaded to the same path
# in the remote node. It is a hash of the local user name and the local absolute
# DeepDive application path.
export APP_ID="$(hash $LOCAL_USER/$DEEPDIVE_APP)"
#SESSION_ID="$(date +%Y%m%d/%H%M%S.%N)"
# DEEPDIVE_RUN_ID is of the form 20160303/170011.599195000
export SESSION_ID="$DEEPDIVE_RUN_ID/$DEEPDIVE_CURRENT_PROCESS_NAME"

# Either take the remote DeepDive app path specified by the user, or default to
# remote dir. The app base directory is the remote directory where DeepDive
# will copy and synchronize the app directory in the workstation with.
export REMOTE_DEEPDIVE_APP="$remote_deepdive_app_base/$APP_ID"
export REMOTE_SH_DIR="$REMOTE_DEEPDIVE_APP/run/$DEEPDIVE_CURRENT_PROCESS_NAME/remote.sh"
export REMOTE_ERR_DIR="$REMOTE_DEEPDIVE_APP/run/$DEEPDIVE_CURRENT_PROCESS_NAME/remote.err"
export REMOTE_JOBID_DIR="$REMOTE_DEEPDIVE_APP/run/$DEEPDIVE_CURRENT_PROCESS_NAME/job.id"

# A directory to store transient data, such as input and output, which we don't
# really want to keep in the workstation.
remote_deepdive_transient_base=${remote_deepdive_transient_base:-"${REMOTE_DEEPDIVE_APP}/run"}
export REMOTE_DEEPDIVE_TRANSIENT="$remote_deepdive_transient_base/$SESSION_ID"
# TODO: Decoupling input to reduce data transfers.
export REMOTE_IN_DIR="$REMOTE_DEEPDIVE_TRANSIENT/remote.in"
export REMOTE_OUT_DIR="$REMOTE_DEEPDIVE_TRANSIENT/remote.out"

export REMOTE_DEEPDIVE_CWD="$REMOTE_DEEPDIVE_APP/run/$DEEPDIVE_CURRENT_PROCESS_NAME"
export KRB_DIR=${krb_dir:-"/tmp/krb5cc_${ssh_user}_deepdive"}

# show configuration
echo "Executing with the following configuration:"
echo " num_processes=$num_processes"
echo " ssh_user=$ssh_user"
echo " ssh_host=$ssh_host"
echo " remote_deepdive_app_base=$remote_deepdive_app_base"
echo " remote_deepdive_transient_base=$remote_deepdive_transient_base"
echo " poll_period_secs=$poll_period_secs"
echo " excludes=$excludes"
echo " additional_paths=$additional_paths"
echo ""
echo "DEEPDIVE_APP = $DEEPDIVE_APP"
echo "DEEPDIVE_CURRENT_PROCESS_NAME = $DEEPDIVE_CURRENT_PROCESS_NAME"
echo "DEEPDIVE_COMPUTER_TYPE = $DEEPDIVE_COMPUTER_TYPE"
echo ""
echo "Session information"
echo " application id : $LOCAL_USER/$DEEPDIVE_APP -> $APP_ID"
echo " session id     : $DEEPDIVE_CURRENT_PROCESS_NAME -> $SESSION_ID"

if ! ssh $SSH_INFO "klist -s -c $KRB_DIR &> /dev/null"; then
  echo "Cannot find Kerberos credentials at $KRB_DIR"
  echo 'Please run `deepdive compute setup` to initialize Kerberos credentials.'
fi

# ensure all the directories needed exist in the remote node.
ssh $SSH_INFO bash -c "'
  # Should already exist, but just to make sure.
  mkdir -p $REMOTE_DEEPDIVE_APP/run/$DEEPDIVE_CURRENT_PROCESS_NAME
  [[ -d $REMOTE_DEEPDIVE_TRANSIENT ]] || mkdir -p $REMOTE_DEEPDIVE_TRANSIENT
'"

DEEPDIVE_CWD="$(pwd)"
REMOTE_DEEPDIVE_CWD="$REMOTE_DEEPDIVE_APP""${DEEPDIVE_CWD##$DEEPDIVE_APP}"

# Slash at the end of DEEPDIVE_APP/ is important, so that rsync doesn't create
# another folder inside $REMOTE_DEEPDIVE_APP
RSYNC_EXCLUDES=""
for exclude in $excludes; do
  # XXX: For some reason, if I enclose $exclude in double quotes and pass it
  # to rsync, rsync will interpret it as '"$exclude"', and fail to find
  # $exclude.
  RSYNC_EXCLUDES+="--exclude $exclude "
done
echo "Copying deepdive app to remote node."
echo "Excluding $RSYNC_EXCLUDES"
rsync -aH $RSYNC_EXCLUDES --progress $DEEPDIVE_APP/ $SSH_INFO:$REMOTE_DEEPDIVE_APP

# Prepare input data
if [[ -n $input_sql ]]; then
  echo "Uploading SQL data to submission node.."
  deepdive-sql eval "$input_sql" format="$DEEPDIVE_LOAD_FORMAT" |
  show_progress input_to "$DEEPDIVE_CURRENT_PROCESS_NAME upload_sql" -- \
  ssh $SSH_INFO "cat > $REMOTE_IN_DIR"
fi

# Prepare submission script
# XXX there are conditional branches below depending on whether input_sql
# and/or output_relation is given, to support four use cases. Depending on
# the cases, need to generate various submission scripts.
# 1) executing command while streaming data from/to the database
# 2) input-only command which has no output to the database and streams from the database
# 3) output-only command which has no input from the database and streams to the database
# 4) database-independent command which simply runs in parallel
# XXX: Right now, submission script doesn't handle the case when there's no
# input_sql.
echo "Performing setup for current task..."
ssh_with_env \
  DEEPDIVE_APP=$REMOTE_DEEPDIVE_APP \
  NUM_PROCESSES=$num_processes \
  REMOTE_ADDITIONAL_PATHS="$additional_paths" \
  COMMAND=\""$command"\" \
  REMOTE_DEEPDIVE_CWD \
  REMOTE_DEEPDIVE_HOME \
  DEEPDIVE_CURRENT_PROCESS_NAME \
  REMOTE_IN_DIR \
  REMOTE_OUT_DIR \
  REMOTE_ERR_DIR \
  REMOTE_SH_DIR \
  -- "deepdive compute remote-taskinit"

# Submit job.
# This is a hack that supposedly prints out the output of ssh asking for the
# password. But the output seems to not be flushed to stdout.
# XXX: Is this a good idea??
exec 5>&1
JOB_ID=$(ssh_with_env \
  DEEPDIVE_APP=$REMOTE_DEEPDIVE_APP \
  SSH_HOST=$ssh_host \
  KRB_DIR \
  REMOTE_JOBID_DIR \
  REMOTE_SH_DIR \
  -- "deepdive compute remote-submit" | tee /dev/fd/5)

for sig in TERM INT; do
  trap "sched_kill $JOB_ID $num_processes" $sig
done
# Poll status
echo "Waiting for job id $JOB_ID to complete"
STATUS=incomplete
#while [[ "$(compute_status)" == "Q" ]] && ssh -q $SSH_INFO "[[ ! -f $REMOTE_OUT_DIR ]]"; do
while [[ "$STATUS" == incomplete ]]; do
  echo "Waiting for $poll_period_secs seconds..."
  sleep $poll_period_secs

  # Efficient way to compute the status as it lifts the burden of checking each
  # job in the submission node.
  exec 5>&1
  JOB_STATUSES=$(ssh_with_env \
    DEEPDIVE_APP=$REMOTE_DEEPDIVE_APP \
    JOB_ID=$JOB_ID \
    NUM_PROCESSES=$num_processes \
    REMOTE_OUT_DIR \
    -- "deepdive compute remote-status" | tee /dev/fd/5)

  echo "Job statuses [I:incomplete, C:complete, F:failed]: "
  echo "  $JOB_STATUSES"
  # Funny way to count number of jobs.
  NUM_COMPLETED=$(grep -o "C" <<< "$JOB_STATUSES" | grep -c "C" || :)
  NUM_INCOMPLETE=$(grep -o "I" <<< "$JOB_STATUSES" | grep -c "I" || :)
  NUM_FAILED=$(grep -o "F" <<< "$JOB_STATUSES" | grep -c "F" || :)

  # Sanity check.
  if [[ ! $((NUM_FAILED + NUM_INCOMPLETE + NUM_COMPLETED)) == $num_processes ]]; then
    error "Total number of jobs != total number of processes"
  fi

  if [[ $NUM_FAILED -gt 0 ]]; then
    STATUS=failed
  fi
  if [[ $NUM_INCOMPLETE -eq 0 ]]; then
    STATUS=complete
  fi
done

export ERROR_PATH="$DEEPDIVE_APP/run/$SESSION_ID/error.log"
[[ -d $(dirname $ERROR_PATH) ]] || mkdir -p $(dirname $ERROR_PATH)

if [[ "$STATUS" == failed ]]; then
    echo "Job failed!"
    echo "Please inspect $ERROR_PATH for more information"

    if [[ $num_processes -gt 1 ]]; then
        echo "--- Multi-job report ---"
        echo "Last job statuses: $JOB_STATUSES"
        echo "Completed jobs: $NUM_COMPLETED"
        echo "Failed jobs: $NUM_FAILED"
        echo "Incomplete jobs: $NUM_INCOMPLETE"
    fi
    error
fi

# Download error logs and other information from remote.
rsync -aH --exclude "$SESSION_ID" $SSH_INFO:$REMOTE_DEEPDIVE_APP/ $DEEPDIVE_APP
ssh $SSH_INFO "tail -n +1 $REMOTE_ERR_DIR*" > $ERROR_PATH

# prepare a temporary output table when output_relation is given
if [[ -n $output_relation ]]; then
    # some derived values
    output_relation_tmp="${DEEPDIVE_PREFIX_TABLE_TEMPORARY}${output_relation}"

    # show configuration
    echo " output_relation_tmp=$output_relation_tmp"
    echo

    # use an empty temporary table as a sink instead of TRUNCATE'ing the output_relation
    deepdive-create table-if-not-exists "$output_relation"
    db-create-table-like "$output_relation_tmp" "$output_relation"

    # Stream output from remote submission node directly to stdin, and to
    # deepdive load
    if [[ $num_processes -gt 1 ]]; then
        extract="ssh $SSH_INFO \"cat $REMOTE_OUT_DIR-*\""
    else
        echo "Single process!!"
        extract="ssh $SSH_INFO \"cat $REMOTE_OUT_DIR\""
    fi
    echo $extract
    eval "$extract" > /dev/stdout |

    # use mkmimo again to merge outputs of multiple processes into a single stream
    #mkmimo process-*.output \> /dev/stdout |

    # load the output data to the temporary table in the database
    # XXX hiding default progress bar from deepdive-load
    # TODO abbreviate this env into a show_progress option, e.g., recursive=false
    show_progress input_to "$DEEPDIVE_CURRENT_PROCESS_NAME output" -- \
    env DEEPDIVE_SHOW_PROGRESS=false \
    deepdive-load "$output_relation_tmp" /dev/stdin

    # rename the new temporary table
    # TODO maybe use PostgreSQL's schema support here?
    echo "Replacing $output_relation with $output_relation_tmp"
    output_relation_old="${DEEPDIVE_PREFIX_TABLE_OLD}${output_relation}"
    deepdive-sql "DROP TABLE IF EXISTS ${output_relation_old};" || true
    deepdive-sql "ALTER TABLE ${output_relation}     RENAME TO ${output_relation_old};"
    deepdive-sql "ALTER TABLE ${output_relation_tmp} RENAME TO ${output_relation};"
    deepdive-sql "DROP TABLE IF EXISTS ${output_relation_old};" || true
    # and analyze the table to speed up future queries
    db-analyze "${output_relation}"
fi
